Creating Curve Ensemble Summaries#
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
import io
import os
import gzip
import plotly.express as px
from tqdm.notebook import tqdm
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.colors as mcolors
import similaritymeasures
import plotly.graph_objs as go
from time import time
import matplotlib.pyplot as plt
import random as rand
import warnings
warnings.filterwarnings('ignore')
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[1], line 1
----> 1 from google.cloud import storage
2 from google.cloud import bigquery
3 from google.oauth2 import service_account
ModuleNotFoundError: No module named 'google'
pip show google-cloud-storage
Name: google-cloud-storage
Version: 2.18.2
Summary: Google Cloud Storage API client library
Home-page: https://github.com/googleapis/python-storage
Author: Google LLC
Author-email: googleapis-packages@google.com
License: Apache 2.0
Location: C:\Users\elija\anaconda3\Lib\site-packages
Requires: google-api-core, google-auth, google-cloud-core, google-crc32c, google-resumable-media, requests
Required-by:
Note: you may need to restart the kernel to use updated packages.
WARNING: Ignoring invalid distribution ~rpcio (C:\Users\elija\anaconda3\Lib\site-packages)
# get our query building function from the other file
from bin_builder import build_country_query
build_country_query
<function bin_builder.build_country_query(table, country_ids, run_ids, min_age, max_age, categories, grouped=True)>
# this prevents a potential memory leak from using kmeans
os.environ['OMP_NUM_THREADS'] = '1'
service_account_id = 'elijahsandler@net-data-viz-handbook.iam.gserviceaccount.com'
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'C:\\Users\\elija\\Documents\\24f-coop\\net-data-viz-handbook-fe2c5531555d.json'
credentials = service_account.Credentials.from_service_account_file('C:\\Users\\elija\\Documents\\24f-coop\\net-data-viz-handbook-fe2c5531555d.json')
project = 'net-data-viz-handbook'
Import data from BigQuery#
# construct query
# NOTE: since the rest of this file is about curve ensamble summaries, we import all runs of one country for a specific age
# group, but you can ask for as many countries and runs as you want
table = 'net-data-viz-handbook.sri_data.SIR_0_countries_incidence_daily'
country_ids = [215]
run_ids = list(range(1, 101))
min_age = 13
max_age = 23
categories = ['Infectious']
# Generate SQL query with grouped=True (default behavior)
query_grouped = build_country_query(table, country_ids, run_ids, min_age, max_age, categories, grouped=True)
# Generate SQL query with grouped=False (return individual bins)
query_separate = build_country_query(table, country_ids, run_ids, min_age, max_age, categories, grouped=False)
query_separate
'SELECT date, country_id, run_id, Infectious_13_17 AS infectious_13_17, Infectious_18_23 AS infectious_18_23 FROM `net-data-viz-handbook.sri_data.SIR_0_countries_incidence_daily` WHERE country_id IN (215) AND run_id IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100) GROUP BY date, country_id, run_id, Infectious_13_17, Infectious_18_23 ORDER BY date;'
# Initialize a GCS client
client = bigquery.Client(credentials=credentials, project=project)
# Run the query
query_job = client.query(query_grouped)
# Fetch the results into a pandas DataFrame
results = query_job.to_dataframe()
# Display the first few rows
results.head()
| date | country_id | run_id | total_infectious | |
|---|---|---|---|---|
| 0 | 2009-02-17 | 215 | 43 | 0 |
| 1 | 2009-02-17 | 215 | 67 | 0 |
| 2 | 2009-02-17 | 215 | 69 | 0 |
| 3 | 2009-02-17 | 215 | 12 | 0 |
| 4 | 2009-02-17 | 215 | 77 | 0 |
Process data#
# this has to happen in pandas
# pivoting data. god what a good function.
""" This is how we create a graphing dataframe for plotly. use df.pivot, select your index, column, and values
(which are columns of your original dataframe) """
df_pivot = results.reset_index().pivot(index='date', columns='run_id', values='total_infectious').fillna(0)
df_pivot
| run_id | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | ... | 91 | 92 | 93 | 94 | 95 | 96 | 97 | 98 | 99 | 100 |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| date | |||||||||||||||||||||
| 2009-02-17 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 2009-02-18 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 2009-02-19 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 2009-02-20 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| 2009-02-21 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 2010-02-13 | 12994 | 0 | 13649 | 14039 | 14690 | 0 | 14518 | 13564 | 14038 | 11972 | ... | 0 | 14212 | 13497 | 13493 | 16202 | 0 | 0 | 0 | 0 | 15322 |
| 2010-02-14 | 12483 | 0 | 13170 | 13731 | 13947 | 0 | 13623 | 13137 | 13325 | 11363 | ... | 0 | 13747 | 12917 | 13010 | 15525 | 0 | 0 | 0 | 0 | 14676 |
| 2010-02-15 | 19267 | 0 | 20866 | 21565 | 21944 | 0 | 21361 | 20537 | 21205 | 17788 | ... | 0 | 21541 | 20579 | 20459 | 24187 | 0 | 0 | 0 | 0 | 22661 |
| 2010-02-16 | 20027 | 0 | 21190 | 22336 | 22723 | 0 | 21737 | 21302 | 21384 | 18326 | ... | 0 | 22356 | 21225 | 20994 | 24950 | 0 | 0 | 0 | 0 | 24166 |
| 2010-02-17 | 20272 | 0 | 22134 | 23092 | 23239 | 0 | 21920 | 21981 | 21600 | 18567 | ... | 0 | 22944 | 22155 | 21264 | 25321 | 0 | 0 | 0 | 0 | 24572 |
366 rows × 100 columns
plt.plot(df_pivot, lw=.2); # remove this semicolon at your own risk
Group and sort curves#
def distance_between_curves(df, method=similaritymeasures.area_between_two_curves, downsample=1, freq=1):
"""Gets the distance between two curves using various algorithms.
Note: all curves must have same domain to use AUC.
Inputs:
df (dataframe or dictionary): dataframe where columns are curve values.
method (func): a distance function from the https://pypi.org/project/similaritymeasures/ library
downsample_factor (int): Factor to downsample the curve values.
E.g., if set to 2, every second point will be kept.
freq (float): Fraction of the times two curves will be compared
E.g. if set to .3, each curve will only be compared to ~30% of other curves
use_tqdm (bool): whether or not a tqdm taskbar will be used (mostly for debugging)
Returns:
curve_diff (dict): keys are tuples of curves, values are distance between them.
"""
# Assert we are using a valid metric
# Initialize dictionary to store distances
curve_diff = dict()
# Data cleanup
# Convert the index to datetime
df = df.reset_index(names='date')
df['date'] = pd.to_datetime(df['date'])
# Convert the datetime to an integer (number of days since epoch)
df['date'] = (df['date'] - pd.Timestamp("1970-01-01")).dt.days
df = df.astype(int)
df.set_index('date', inplace=True)
# Downsample the curve values if needed
if downsample > 1:
df = df.iloc[::downsample]
# use tqdm
iterator = tqdm(df.columns, leave=True)
# Loop through each pair of curves - computationally intensive part: O(n^2)
for first_curve in iterator:
for second_curve in df.columns[df.columns.get_loc(first_curve) + 1:]:
if rand.random() <= freq:
curve_diff[(first_curve, second_curve)] = \
method(df[first_curve].reset_index().to_numpy(), df[second_curve].reset_index().to_numpy())
# Return dictionary
return curve_diff
fr = distance_between_curves(df_pivot, method=similaritymeasures.mse, downsample=10, freq=.1)
def generate_groups(curve_diff, k):
"""
Uses k-means clustering to group curves based on their distance scores.
Inputs:
curve_diff (dict): Dictionary where keys are tuples representing pairs of curves,
and values are the distance between them.
k (int): Desired number of groups.
Returns:
labels (array): Integer labels [0, k) for each curve.
"""
# Get all unique elements
elements = sorted(set().union(*curve_diff.keys()))
n = len(elements)
# Create a mapping from curves to indices
element_idx = {elem: idx for idx, elem in enumerate(elements)}
# Create the distance matrix
distance_matrix = np.zeros((n, n))
for (curve1, curve2), score in (curve_diff.items()):
idx1, idx2 = element_idx[curve1], element_idx[curve2]
distance_matrix[idx1, idx2] = score
distance_matrix[idx2, idx1] = score # Matrix is symmetric
# Convert distance matrix to similarity matrix
similarity_matrix = np.max(distance_matrix) - distance_matrix
# Fit K-Means clustering
kmeans = KMeans(n_clusters=k)
labels = kmeans.fit_predict(similarity_matrix)
return labels
labels = generate_groups(fr, 3)
def get_centrality(curve_diff, labels):
"""
Computes centrality scores for each curve based on their group assignments.
Inputs:
curve_diff (dict): Dictionary where keys are tuples representing pairs of curves,
and values are the distance between them.
labels (array): Array of cluster labels for each curve.
Returns:
centrality (dict): Centrality scores for each curve.
"""
# Get all unique curves and their corresponding labels
unique_curves = sorted(set(c for pair in curve_diff.keys() for c in pair))
unique_labels = set(labels)
# Create a DataFrame from curve_diff
df_scores = pd.DataFrame(list(curve_diff.items()), columns=['pair', 'score'])
df_scores[['curve1', 'curve2']] = pd.DataFrame(df_scores['pair'].tolist(), index=df_scores.index)
df_scores = df_scores.drop(columns='pair')
# Initialize a centrality dictionary with curve names as keys
centrality = {curve: 0 for curve in unique_curves}
# Loop over each label to calculate centrality for each group
for label in (unique_labels):
# Get curves that belong to this group by mapping indices to actual curve names
curves_in_group = [unique_curves[i] for i in np.where(labels == label)[0]]
# Filter scores only for curves within the same group
df_gr = df_scores[df_scores['curve1'].isin(curves_in_group) & df_scores['curve2'].isin(curves_in_group)]
# Calculate centrality for each curve in the group
for curve in curves_in_group:
centrality[curve] = df_gr[(df_gr['curve1'] == curve) | (df_gr['curve2'] == curve)]['score'].sum()
return centrality
centrality = get_centrality(fr, labels);
def get_most_central(labels, centrality, percentile=50):
"""
Gets the most central curves for each group based on centrality scores.
Inputs:
labels (array): Array of cluster labels for each curve.
centrality (dict): Dictionary of centrality scores for each curve.
percentile (float, optional): Percentile threshold to select the most central curves. Default is 50.
Returns:
curves (dict): A dictionary where the keys are group labels and the values are lists of curve names
that are the most central in each group.
"""
# Get unique curves from the centrality dictionary
unique_curves = list(centrality.keys())
# Initialize dictionary to store results
curves = dict()
# Loop over each group
for group in (set(labels)):
# Get the curve names corresponding to the current group label
curves_in_group = [unique_curves[i] for i in np.where(labels == group)[0]]
# Create a pandas Series for centrality scores, filtered by curves in the current group
centrality_series = pd.Series(centrality).loc[curves_in_group]
# Sort by centrality and select the top percentile
n_top_curves = int((percentile / 100) * len(curves_in_group))
most_central_curves = centrality_series.sort_values().reset_index(drop=False).loc[:n_top_curves, 'index']
# Store the most central curves for the group
curves[group] = most_central_curves.tolist()
return curves
get_most_central(labels, centrality);
most_central = get_most_central(labels, centrality)
c = list()
for cat in most_central.keys():
c = c + most_central[cat]
def normalize_column(column, flip=True):
if flip:
return 1 - (column-column.min()) / (column.max()-column.min())
else:
return (column-column.min()) / (column.max()-column.min())
def color_to_rgba(color_name, value, alpha):
# Get the RGB components of the color
rgb = mcolors.to_rgb(color_name)
# Calculate the gray value based on the input value
gray = (1 - value) * 0.8 # This adjusts how gray the color will be
# Compute the final RGBA values
r = rgb[0] * value + gray
g = rgb[1] * value + gray
b = rgb[2] * value + gray
# Return the RGBA string
return f'rgba({int(r * 255)}, {int(g * 255)}, {int(b * 255)}, {alpha})'
color_to_rgba('cyan', .4, .8)
'rgba(122, 224, 224, 0.8)'
df_info = pd.DataFrame(normalize_column(pd.Series(centrality)))
df_info.columns = ['centrality']
df_info['group'] = labels
df_info
| centrality | group | |
|---|---|---|
| 1 | 0.547639 | 2 |
| 2 | 0.342867 | 2 |
| 3 | 0.702482 | 2 |
| 4 | 0.967828 | 2 |
| 5 | 0.981827 | 1 |
| ... | ... | ... |
| 96 | 0.534412 | 2 |
| 97 | 0.807093 | 2 |
| 98 | 0.633335 | 2 |
| 99 | 1.000000 | 0 |
| 100 | 0.892521 | 2 |
100 rows × 2 columns
colors = ['red', 'blue', 'green', 'cyan', 'yellow', 'gray']
cmap = dict(zip(range(len(colors)), colors))
fig = go.Figure()
# Create a set to track groups already added to the legend
legend_groups = set()
# select graphing df
df_roll = df_pivot.rolling(7).median().dropna() # one week rolling average
# by adding [c], we can only graph the most central curves from each group (calculated above)
df_graph = df_roll[c]
# get prevalence of each group, so groups with low number of curves aren't overrepresented
alphas = {i: df_info['group'].value_counts()[i] / df_info['group'].value_counts().sum() for i in np.unique(labels)}
# Scale linearly so the highest value is 1
max_value = max(alphas.values())
scaled_alphas = {i: value / max_value for i, value in alphas.items()}
for curve in df_info.T[df_graph.columns].T.sort_values('centrality', ascending=True).index:
c_gr = df_info['group'][curve]
c_cen = df_info['centrality'][curve]
c_alpha = scaled_alphas[c_gr]
# Check if the group has already been added to the legend
show_legend = c_gr not in legend_groups
if show_legend:
legend_groups.add(c_gr) # Add group to the set
fig.add_trace(go.Scatter(
name=f'Group {c_gr}' if show_legend else "", # Only add name if it's the first curve in the group
x=df_graph.index,
y=df_graph[curve],
marker=dict(color=color_to_rgba(cmap[c_gr], c_cen, c_alpha)),
line=dict(width=(c_cen/2)**2),
mode='lines',
showlegend=show_legend, # Show legend only for the first curve of the group
legendgroup=str(c_gr) # Assign to legend group
))
fig.show()
Aside: testing the comparison function#
Run this at your own risk, it takes a while. Uses a modified version of the function for some extra data and less taskbar.
def distance_between_curves(df, method=similaritymeasures.mse, downsample=1, freq=1):
"""Gets the distance between two curves using various algorithms.
Note: all curves must have same domain to use AUC.
Inputs:
df (dataframe or dictionary): dataframe where columns are curve values.
method (func): a distance function from the https://pypi.org/project/similaritymeasures/ library
downsample_factor (int): Factor to downsample the curve values.
E.g., if set to 2, every second point will be kept.
freq (float): Fraction of the times two curves will be compared
E.g. if set to .3, each curve will only be compared to ~30% of other curves
Returns:
curve_diff (dict): keys are tuples of curves, values are distance between them.
num_comparisons (int): total number of comparisons made.
"""
# Initialize dictionary to store distances and counter for comparisons
curve_diff = dict()
num_comparisons = 0
if downsample > 1:
df = df.iloc[::downsample]
# Data cleanup
df = df.reset_index(names='date')
df['date'] = pd.to_datetime(df['date'])
df['date'] = (df['date'] - pd.Timestamp("1970-01-01")).dt.days
df = df.astype(int)
df.set_index('date', inplace=True)
for first_curve in (df.columns):
for second_curve in df.columns[df.columns.get_loc(first_curve) + 1:]:
if rand.random() <= freq:
curve_diff[(first_curve, second_curve)] = \
method(df[first_curve].reset_index().to_numpy(), df[second_curve].reset_index().to_numpy())
num_comparisons += len(df[first_curve])
return curve_diff, num_comparisons
# Range of values for `freq` and `downsample`
freq_values = [.1, .5, 1]
downsample_values = [1, 10, 50]
n = 1 # setting this to anything above 1 is a very bad idea but it's here because I'm using good practices
results = []
# Checkpoint before starting the experiment
print("Starting experiments...")
for freq in freq_values:
print(f"\nFrequency: {freq} - Starting downsampling experiments...")
for downsample in downsample_values:
total_time = 0
total_comparisons = 0
# Repeat the experiment `n` times
for i in range(n):
start_time = time()
# Run the distance_between_curves function with current `freq` and `downsample`
fr, num_comparisons = distance_between_curves(
df_pivot,
method=similaritymeasures.curve_length_measure,
downsample=downsample,
freq=freq
)
# Measure how long it took
elapsed_time = time() - start_time
# Accumulate the total time
total_time += elapsed_time
total_comparisons += num_comparisons
# Checkpoint after each run
# print(f"Run {i + 1}/{n} for freq={freq}, downsample={downsample} - Time: {elapsed_time:.4f}s, Comparisons: {num_comparisons}")
# Calculate the average time for this combination
avg_time = total_time / n
avg_comparisons = total_comparisons / n
# Append the average result to the list
results.append({
'freq': freq,
'downsample': downsample,
'time': avg_time,
'comparisons': avg_comparisons
})
# Checkpoint after completing each downsample experiment
print(f"Completed downsample={downsample} for freq={freq} - Avg Time: {avg_time:.4f}s, Avg Comparisons: {avg_comparisons:.2f}")
# Convert the results to a dataframe for easier plotting
df_results = pd.DataFrame(results)
# Final checkpoint
print("Experiments completed. Results are ready for analysis.")
df_results.head()
px.scatter(df_results, x='time', y='comparisons',
# size='downsample',
color='freq',
title='Number of Comparisons for 100 Curves Using Different Parameters',
hover_data=['downsample'])
What is a function if not a bunch of cells glued together with duct tape and a prayer?#
This is all of my code from above (minus the cloud stuff and cleaning data) that GPT threw together into a function after a lot of finagling.
def pipeline_curve_analysis(df, method, downsample, freq, k, percentile):
start = time()
# Step 1: Get distance between curves
print("Step 1: Calculating distance between curves...")
fr = distance_between_curves(df, method=method, downsample=downsample, freq=freq)
print("Distance calculation complete.")
# Step 2: Generate groups
print("Step 2: Generating groups...")
labels = generate_groups(fr, k)
print("Group generation complete.")
# Step 3: Get centrality scores
print("Step 3: Calculating centrality scores...")
centrality = get_centrality(fr, labels)
print("Centrality score calculation complete.")
# Step 4: Get most central curves
print("Step 4: Identifying most central curves...")
most_central = get_most_central(labels, centrality, percentile)
c = list()
for cat in most_central.keys():
c = c + most_central[cat]
# Generate df_info for visualization
print("Generating df_info for visualization...")
df_info = pd.DataFrame(normalize_column(pd.Series(centrality)))
df_info.columns = ['centrality']
df_info['group'] = labels
# Prepare the figure
print("Preparing figure for visualization...")
colors = ['red', 'blue', 'green', 'cyan', 'yellow', 'gray']
cmap = dict(zip(range(len(colors)), colors))
fig = go.Figure()
legend_groups = set()
df_roll = df.rolling(7).median().dropna() # One week rolling average
df_graph = df_roll[c] # Using the most central curves only
alphas = {i: df_info['group'].value_counts()[i] / df_info['group'].value_counts().sum() for i in np.unique(labels)}
max_value = max(alphas.values())
scaled_alphas = {i: value / max_value for i, value in alphas.items()}
for curve in df_info.T[df_graph.columns].T.sort_values('centrality', ascending=True).index:
c_gr = df_info['group'][curve]
c_cen = df_info['centrality'][curve]
c_alpha = scaled_alphas[c_gr]
show_legend = c_gr not in legend_groups
if show_legend:
legend_groups.add(c_gr)
fig.add_trace(go.Scatter(
name=f'Group {c_gr}' if show_legend else "",
x=df_graph.index,
y=df_graph[curve],
marker=dict(color=color_to_rgba('maroon', c_cen, c_alpha)),
line=dict(width=(c_cen / 2) ** 2),
mode='lines',
showlegend=show_legend,
legendgroup=str(c_gr)
))
# Title and axis labels
print("Finalizing figure layout...")
fig.update_layout(
title=f'Curve Analysis: method={method.__name__}, downsample={downsample}, freq={freq}, k={k}, percentile={percentile}, \
time={round(time()-start, 4)}sec',
xaxis_title='Date',
yaxis_title='Incidence'
)
filename = f'../images/ensamble_method-{method.__name__}_downsample-{downsample}_freq-{freq}_k-{k}_percentile-{percentile}.html'
# Save figure as HTML
print("Saving figure as HTML file...")
fig.write_html(filename)
fig.show()
print("Process complete.")
# Example of how to call the function
pipeline_curve_analysis(df_pivot, method=similaritymeasures.mse, downsample=1, freq=1, k=3, percentile=50)
Step 1: Calculating distance between curves...
Distance calculation complete.
Step 2: Generating groups...
Group generation complete.
Step 3: Calculating centrality scores...
Centrality score calculation complete.
Step 4: Identifying most central curves...
Generating df_info for visualization...
Preparing figure for visualization...
Finalizing figure layout...
Saving figure as HTML file...
Process complete.